3872511bfc0fadbbe02a7970ca0e891e9741e43a,cdap-app-templates/cdap-etl/cdap-etl-lib/src/main/java/co/cask/cdap/templates/etl/realtime/sources/JmsSource.java,JmsSource,poll,#Emitter#SourceState#,118
Before Change
return currentState;
}
writer.emit(text);
return new SourceState(currentState.getState());
}
After Change
// Try to get message from Queue
Message message = null;
int count = 0;
do {
try {
message = consumer.receive(JMS_CONSUMER_TIMEOUT_MS);
} catch (JMSException e) {
LOG.warn("Exception when trying to receive message from JMS consumer: {}", CDAP_JMS_SOURCE_NAME);
}
if (message != null) {
String text;
try {
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
text = textMessage.getText();
LOG.trace("Process JMS TextMessage : ", text);
} else if (message instanceof BytesMessage) {
BytesMessage bytesMessage = (BytesMessage) message;
int bodyLength = (int) bytesMessage.getBodyLength();
byte[] data = new byte[bodyLength];
int bytesRead = bytesMessage.readBytes(data);
if (bytesRead != bodyLength) {
LOG.warn("Number of bytes read {} not same as expected {}", bytesRead, bodyLength);
}
text = new String(data).intern();
LOG.trace("Processing JMS ByteMessage : {}", text);
} else {
// Different kind of messages, just get String for now
// TODO Process different kind of JMS messages
text = message.toString();
LOG.trace("Processing JMS message : ", text);
}
} catch (JMSException e) {
LOG.error("Unable to read text from a JMS Message.");
continue;
}
writer.emit(text);
count++;
}
} while (message != null && count < messagesToReceive);
return new SourceState(currentState.getState());
}